声明:博主写了一些Ambari系列文章,可以在历史文章中查看。

本文篇幅较长,但都是满满的干货。主要从Ambari-server详解、如何debug ambari-server源码、开发流程分析图、开发流程自定义示例四大部分入手,教读者如何玩转ambari-server

一、Ambari-server详解

1. 简介

Ambari-Server是一个WEB Server,提供统一的REST API接口,同时向web和agent开放了两个不同的端口(默认前者是8080, 后者是8440或者8441)。它是由Jetty Server容器构建,通过Spring Framework构建出来的WEB服务器,其中大量采用了google提供的Guice注解完成spring框架所需要的注入功能。 REST框架由JAX-RS标准来构建。

2. 目录

目录 描述
org.apache.ambari.server.api.services 对web接口的入口方法,处理/api/v1/* 的请求
org.apache.ambari.server.controller 对Ambari中cluster的管理处理,如新增host,更service、删除component等
org.apache.ambari.server.controller.internal 主要存放ResourceProvider和PropertyProvider;
org.apache.ambari.service.orm.* 对数据库的操作
org.apache.ambari.server.agent.rest 处理与Agent的接口的入口方法
org.apache.ambari.security 使用Spring Security来做权限管理

3. Resource

其中,每一种Resource都对应一个ResourceProvider,对应关系如下:

Resource.Type ResourceProvider
Workflow WorkflowResourceProvider
Job JobResourceProvider
TaskAttempt TaskAttemptResourceProvider
View ViewResourceProvider
ViewInstance ViewInstanceResourceProvider
Blueprint BlueprintResourceProvider
Cluster ClusterResourceProvider
Service ServiceResourceProvider
Component ComponentResourceProvider
Host HostResourceProvider
HostComponent HostComponentResourceProvider
Configuration ConfigurationResourceProvider
Action ActionResourceProvider
Request RequestResourceProvider
Task TaskResourceProvider
User UserResourceProvider
Stack StackResourceProvider
StackVersion StackVersionResourceProvider
StackService StackServiceResourceProvider
StackServiceComponent StackServiceComponentResourceProvider
StackConfiguration StackConfigurationResourceProvider
OperatingSystem OperatingSystemResourceProvider
Repository RepositoryResourceProvider
RootService RootServiceResourceProvider
RootServiceComponent RootServiceComponentResourceProvider
RootServiceHostComponent RootServiceHostComponentResourceProvider
ConfigGroup ConfigGroupResourceProvider
RequestSchedule RequestScheduleResourceProvider

我们对数据的处理就是在xxxResourceProvider.java内实现。

4. 获取数据流程

​ (1) jersy接口接收到请求,创建一个ResourceInstance实例;

​ (2) 解析http请求构造一个Request对象,然后交给reques的process()方法来处理;

​ (3) reques解析url或http_body得到一个Predicate对象;

​ (4) 根据http类型获取handler,GET请求对应ReadHandler;

​ (5) handler向Query对象中添加分页、Render、Predicate等属性后,然后让query.execute();

​ (6) 根据Resource.Type获得对应的ResourceProvider对象,调用其getResources方法得到Set\

​ (7) 调用对应的PropertyProvider填充Resource;

​ (8) 处理结果,返回json结果;

二、Ambari-server Debug模式

1. 停止ambari-server服务

1
ambari-server stop

2. 以debug的方式来启动ambari-server

1
java -server -Xdebug  -Xrunjdwp:transport=dt_socket,suspend=n,server=y,address=5005 -XX:NewRatio=3 -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit -XX:CMSInitiatingOccupancyFraction=60 -XX:+CMSClassUnloadingEnabled -Dsun.zip.disableMemoryMapping=true -Xms1012m -Xmx3048m -XX:MaxPermSize=256m -Djava.security.auth.login.config=/etc/ambari-server/conf/krb5JAASLogin.conf -Djava.security.krb5.conf=/etc/krb5.conf -Djavax.security.auth.useSubjectCredsOnly=false -cp /etc/ambari-server/conf:/usr/lib/ambari-server/*:/usr/share/java/mysql-connector-java-5.1.45-bin.jar     org.apache.ambari.server.controller.AmbariServer

如图所示:

3. 在IDEA中连接Ambari-Server

ambari-server默认配置了服务端的debug参数,端口为5005。如果要修改端口,可以在/usr/sbin/ambari_server_main.py文件中的对应位置修改,对应的ambari源码位置是ambari-server/src/main/python/ambari_server_main.py,直接改5005端口即可。代码如下:

1
2
3
4
5
6
7
8
9
SERVER_START_CMD_DEBUG = "{0} " \
"-server -XX:NewRatio=2 " \
"-XX:+UseConcMarkSweepGC " + \
"{1} {2} " \
" -Xdebug -Xrunjdwp:transport=dt_socket,address=5005," \
"server=y,suspend={6} " \
"-cp {3} " + \
"org.apache.ambari.server.controller.AmbariServer " \
"> {4} 2>&1 || echo $? > {5}"

在你要运行的代码上打上断点,比如,我要看http://172.16.0.142:8080/api/v1/users,就在user的代码某流程处打上断点:

点击debug按钮,在XShell内输入:curl -u admin:admin http://172.16.0.142:8080/api/v1/users

debug模式下,这几个按钮比较常用。从上往下,从左往右描述,分别为:一键断点处、断点概览、取消全部断点、平行执行、跳入执行、跳出执行等。

当一个流程走通时,如果要关闭debug模式,只需要Ctrl + c终止debug进程即可。这样ambari-server也就停掉了。

三、开发流程分析

以 GET /api/v1/users 为例进行。该接口用于获取所有用户。
资源请求类,一通百通。

四、开发流程示例

1. 编写实体类(Entity)

路径:src/main/java/org/apache/ambari/server/orm/entities/AuditlogEntity.java

对应的数据表为auditlog_table,添加注解@Table、@Entity、@Id、@Column;添加属性:id和note;添加getter/setter方法。

1
2
3
4
5
6
7
8
9
10
CREATE TABLE `auditlog_table` (
`Id` int(11) NOT NULL AUTO_INCREMENT,
`level` varchar(255) NOT NULL DEFAULT '' COMMENT '优先级',
`category` varchar(255) NOT NULL DEFAULT '' COMMENT '类目',
`thread` varchar(255) NOT NULL DEFAULT '' COMMENT '进程',
`time` varchar(30) NOT NULL DEFAULT '' COMMENT '时间',
`location` varchar(255) NOT NULL DEFAULT '' COMMENT '位置',
`note` text COMMENT '日志信息',
PRIMARY KEY (`Id`)
);

这步的最后,还需要在/src/main/resources/META-INF/persistence.xml里面添加AuditlogEntity的信息。

2. 编写DAO层

路径:src/main/java/org/apache/ambari/server/orm/dao/AuditlogDAO.java

sql语句要遵循JPA规范。

createQuery()方法里的sql语句,为SELECT auditlog FROM AuditlogEntity auditlog,其中AuditlogEntity要和实体Entity类名保持一致,前后的auditlog也要保持一致,这样最后程序会把sql语句翻译为SELECT * FROM auditlog_table。如果要指定表的某一个属性,比如id,则id的表现形式为auditlog.id

3. 编写service层

路径:src/main/java/org/apache/ambari/server/api/services/AuditlogService.java

3. 声明Resource类型

1) Resource.java

路径:org.apache.ambari.server.controller.spi.Resource.java

1
2
3
4
5
enum InternalType {
...,
Auditlog,
...
}
1
2
3
4
5
final class Type implements Comparable<Type>{
...,
public static final Type Auditlog = InternalType.Auditlog.getType();
...
}
2) AuditlogResourceDefinition.java

路径:org.apache.ambari.server.api.resources.AuditlogResourceDefinition.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class AuditlogResourceDefinition extends BaseResourceDefinition {

/**
* Constructor.
*
*/
public AuditlogResourceDefinition() {
super(Resource.Type.Auditlog);
}

@Override
public String getPluralName() {
return "auditlogs";
}

@Override
public String getSingularName() {
return "auditlog";
}
}
3) ResourceInstanceFactoryImpl.java

路径:org.apache.ambari.server.api.resources.ResourceInstanceFactoryImpl.java

1
2
3
case Auditlog:
resourceDefinition = new AuditlogResourceDefinition();
break;
4) AbstractControllerResourceProvider.java

路径:org.apache.ambari.server.controller.internal.AbstractControllerResourceProvider.java

1
2
case Auditlog:
return new AuditlogResourceProvider(propertyIds, keyPropertyIds, managementController);

4.在json文件内添加属性值

1) key_properties.json
1
2
3
4
5
6
7
8
9
10
11
# 路径 src/main/resources/key_properties.json
"Auditlog": {
"Auditlog": "auditlog/id",
"Stack": "auditlog/time",
"User": "auditlog/user",
"Host": "auditlog/status",
"Task": "auditlog/operation",
"Group": "auditlog/remoteIp",
"Cluster": "auditlog/note"
},
# 说明:key值必须为Resource.type的值;value值为你想要的json的key值。
2) properties.json
1
2
3
4
5
6
7
8
9
10
11
12
# 路径 src/main/resources/properties.json
"Auditlog":[
"auditlog/id",
"auditlog/time",
"auditlog/user",
"auditlog/status",
"auditlog/operation",
"auditlog/remoteIp",
"auditlog/note",
"_"
],
# 说明:该数组的内容必须要少于等于key_properties.json的内容。

说明:key_properties.json的内容必须要包含properties.json的全部内容,在ClusterControllerImpl.checkProperties()会做相应的判断。

5. 编写resourceProvider

路径:org.apache.ambari.server.controller.internal.AuditlogResourceProvider.java

1) 继承AbstractControllerResourceProvider类
1
2
3
public class AuditlogResourceProvider extends AbstractControllerResourceProvider {
...
}
2) 定义常量
1
2
3
4
5
6
7
8
9
private static final String AUDIT_ID_PROPERTY_ID = PropertyHelper.getPropertyId("auditlog", "id");
private static final String AUDIT_TIME_PROPERTY_ID = PropertyHelper.getPropertyId("auditlog", "time");
private static final String AUDIT_USER_PROPERTY_ID = PropertyHelper.getPropertyId("auditlog", "user");
private static final String AUDIT_STATUS_PROPERTY_ID = PropertyHelper.getPropertyId("auditlog", "status");
private static final String AUDIT_OPERATION_PROPERTY_ID = PropertyHelper.getPropertyId("auditlog", "operation");
private static final String AUDIT_REMOTEIP_PROPERTY_ID = PropertyHelper.getPropertyId("auditlog", "remoteIp");
private static final String AUDIT_NOTE_PROPERTY_ID = PropertyHelper.getPropertyId("auditlog", "note");
private static Set<String> pkPropertyIds =
new HashSet<String>(Arrays.asList(new String[]{AUDIT_ID_PROPERTY_ID}));
3) 创建构造器
1
2
3
4
5
6
7
8
AuditlogResourceProvider(Set<String> propertyIds,
Map<Resource.Type, String> keyPropertyIds,
AmbariManagementController managementController) {
super(propertyIds, keyPropertyIds, managementController);

setRequiredCreateAuthorizations(EnumSet.of(RoleAuthorization.AMBARI_MANAGE_USERS));
setRequiredDeleteAuthorizations(EnumSet.of(RoleAuthorization.AMBARI_MANAGE_USERS));
}
4) getResource()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@Override
public Set<Resource> getResources(Request request, Predicate predicate)
throws SystemException, UnsupportedPropertyException, NoSuchResourceException, NoSuchParentResourceException {

final Set<AuditlogRequest> requests = new HashSet<AuditlogRequest>();

if (predicate == null) {
requests.add(getRequest(null));
} else {
for (Map<String, Object> propertyMap : getPropertyMaps(predicate)) {
requests.add(getRequest(propertyMap));
}
}

Set<AuditlogResponse> responses = getResources(new Command<Set<AuditlogResponse>>() {
@Override
public Set<AuditlogResponse> invoke() throws AmbariException, AuthorizationException {
return getManagementController().getAuditlog(requests);
}
});

if (LOG.isDebugEnabled()) {
LOG.debug("Found clusters matching getClusters request"
+ ", clusterResponseCount=" + responses.size());
}

Set<String> requestedIds = getRequestPropertyIds(request, predicate);
Set<Resource> resources = new HashSet<Resource>();

for (AuditlogResponse auditlogResponse : responses) {

ResourceImpl resource = new ResourceImpl(Resource.Type.Auditlog);

String note = auditlogResponse.getNote();
String[] sourceNoteArray = note.split(", ");
String time = sourceNoteArray[0].replace("T", " ").replaceAll("\\..*", "");
String user = sourceNoteArray[1].replace("User(", "").replace(")", "");
String status = note.split("Status\\(")[1].split("\\)")[0];

boolean isOperation = note.contains("Operation");
String operation = null;
if (isOperation) {
operation = note.split("Operation\\(")[1].split("\\),")[0];
} else {
operation = "";
}

boolean isRemoteIp = note.contains("RemoteIp");
String remoteIp = null;
if (isRemoteIp) {
remoteIp = note.split("RemoteIp\\(")[1].split("\\)")[0];
} else {
remoteIp = "";
}

// 该方法是判断requestedIds是否包含AUDIT_ID_PROPERTY_ID,如果包含则返回true,将值存入resource中;反之则不存入。
setResourceProperty(resource, AUDIT_ID_PROPERTY_ID,
auditlogResponse.getId(), requestedIds);
setResourceProperty(resource, AUDIT_TIME_PROPERTY_ID,
time, requestedIds);
setResourceProperty(resource, AUDIT_USER_PROPERTY_ID,
user, requestedIds);
setResourceProperty(resource, AUDIT_STATUS_PROPERTY_ID,
status, requestedIds);
setResourceProperty(resource, AUDIT_OPERATION_PROPERTY_ID,
operation, requestedIds);
setResourceProperty(resource, AUDIT_REMOTEIP_PROPERTY_ID,
remoteIp, requestedIds);
setResourceProperty(resource, AUDIT_NOTE_PROPERTY_ID,
note, requestedIds);

resources.add(resource);
}
return resources;
}
5) 类内其他方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* ----- ResourceProvider ------------------------------------------------
*/

@Override
protected Set<String> getPKPropertyIds() {
return pkPropertyIds;
}

private AuditlogRequest getRequest(Map<String, Object> properties) {
if (properties == null) {
return new AuditlogRequest(null);
}

AuditlogRequest request = new AuditlogRequest (
(String)properties.get(AUDIT_NOTE_PROPERTY_ID)
);

request.setId((Long) properties.get(AUDIT_ID_PROPERTY_ID));
request.setNote((String) properties.get(AUDIT_NOTE_PROPERTY_ID));

return request;
}

6. 新建AuditlogRequest类

路径:src/main/java/org/apache/ambari/server/controller/AuditlogRequest.java

7. 新建AuditlogResponse类

路径:src/main/java/org/apache/ambari/server/controller/AuditlogResponse.java

8. 添加getAuditlog()

1
2
3
4
# 在AmbariManagementController类内添加getAuditlog()
# 路径:org.apache.ambari.server.controller.AmbariManagementController.java
Set<AuditlogResponse> getAuditlog(Set<AuditlogRequest> requests)
throws AmbariException, AuthorizationException;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 在AmbariManagementControllerImpl类内实现getAuditlog()
# 路径:org.apache.ambari.server.controller.AmbariManagementControllerImpl.java
@Override
public Set<AuditlogResponse> getAuditlog(Set<AuditlogRequest> requests)
throws AmbariException, AuthorizationException {
Set<AuditlogResponse> responses = new HashSet<>();

for (AuditlogRequest r : requests) {

if (LOG.isDebugEnabled()) {
LOG.debug("Received a getAuditlog request"
+ ", auditlogRequest=" + r.toString());
}

String requestedNote = r.getNote();
for (AuditlogEntity u : auditlogDAO.findAll()) {
AuditlogResponse resp = new AuditlogResponse(u.getId(), u.getNote());
responses.add(resp);
}
}
return responses;
}

9. REST API展示形式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
{
href: "http://172.16.0.142:8080/api/v1/log/getAuditLog",
items: [
{
href: "http://172.16.0.142:8080/api/v1/log/getAuditLog/2",
auditlog: {
id: 2,
note: "2018-06-22T17:57:06.894-0700, User(admin), RemoteIp(172.16.0.167), Operation(User login), Roles( Ambari: 管理员 ), Status(Success)",
operation: "User login",
remoteIp: "172.16.0.167",
status: "Success",
time: "2018-06-22 17:57:06",
user: "admin"
}
},
{
href: "http://172.16.0.142:8080/api/v1/log/getAuditLog/1855",
auditlog: {
id: 1855,
note: "2018-07-12T18:51:00.846+0800, User(admin), RemoteIp(172.16.0.142), Operation(User login), Roles( Ambari: 管理员 ), Status(Success)",
operation: "User login",
remoteIp: "172.16.0.142",
status: "Success",
time: "2018-07-12 18:51:00",
user: "admin"
}
}
]
}